iT邦幫忙

2025 iThome 鐵人賽

DAY 6
3
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 6

【知其然,更知其所以然】Day 6:批量寫入的威力 - 讓 Kafka Consumer 暢行無阻

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20250826/20124758oe1debERdH.jpg
在資料流處理的世界裡,很多人覺得「即時」等於「一條一條馬上寫入 DB」。但這就好比外送員每接到一單,就立刻停下手上的工作、騎車送到客人家 - 送得慢、油耗高,還容易堵在半路。

我們在 Day 5 說過,一個更聰明的做法,是先過濾、後入庫,再批量寫入 DB。今天,我們就來聊聊批量寫入最佳實踐。

打造支援 Batch 的 PostgreSQL Sink

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

我們在前面提到,批量寫入(batch insert)能大幅提升吞吐與效能。下面這段程式碼是一個 SimplePostgreSQLSink 的實作,它會:

  • 接收從 upstream 傳來的資料(如 Kafka Consumer 處理後)
  • 暫存至 buffer 中
  • 當累積達到設定的 batch_size,就一次性寫入資料庫

初始化與緩衝區設計

init 中,我們初始化了用來暫存資料的 _buffer:

class SimplePostgreSQLSink(BaseSink):
    """
    支援 batch 寫入的 PostgreSQL Sink
    """
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
        self.connection = None  # DB 連接
        
        # Batch 相關
        self._buffer: List[Dict[str, Any]] = []

寫入邏輯:write()

這是對外提供的主要入口,每當收到一筆資料時:

  1. 加入 buffer
  2. 若筆數達 batch_size 就批量寫入
    ┌─────────────────────┐
    │     Buffer          │
    │                     │
    │ [data1, data2, ...] │
    └─────────────────────┘
           │
           ▼
    ┌──────────────────────┐
    │   Count Trigger      │
    │                      │
    │   size ≥ 100?        │
    │   Yes → Flush to DB  │
    └──────────────────────┘
def write(self, message: Dict[str, Any]):
    """
    將 message 加入 buffer,達到 batch_size 時批次寫入
    """
    # 從 message 取得實際資料
    data = message.get('value', {})
    if not data:
        return
    
    # 加入 buffer
    self._buffer.append(data)
    
    # 達到批次大小,批量寫入
    if len(self._buffer) >= self.batch_size:
        # buffer 滿了,批量寫入
        # ... 執行批量插入
        self._buffer.clear()  # 清空 buffer

核心特色:累積到指定筆數後,批量寫入資料庫。

定時器機制:解決「慢滿批次」的問題

如果資料流入速度很慢,buffer 可能很久才滿,這樣就失去了即時性。

解決方案:除了「筆數觸發」,還要加上「時間觸發」。

    ┌─────────────────────┐
    │     Buffer          │
    │                     │
    │ [data1, data2, ...] │
    └─────────────────────┘
           │         │
           │         │
           ▼         ▼
    ┌──────────┐   ┌──────────┐
    │  Count   │   │  Timer   │
    │ Trigger  │   │ Trigger  │
    │          │   │          │
    │size ≥ 100│   │100ms tick│
    └──────────┘   └──────────┘
           │         │
           └─────┬───┘
                 │
                 ▼
    ┌─────────────────────┐
    │    Flush to DB      │
    │                     │
    │ INSERT batch data   │
    │ Clear buffer        │
    └─────────────────────┘
def __init__(self, batch_size: int = 100):
    self.batch_size = batch_size
    self._buffer = []
    self._start_timer()  # 啟動定時器

def write(self, message):
    data = message.get('value', {})
    self._buffer.append(data)
    
    # 筆數觸發:滿了就寫入
    if len(self._buffer) >= self.batch_size:
        self._flush_and_clear()

def _start_timer(self):
    # 每 100ms 檢查一次是否需要 flush
    timer = threading.Timer(0.1, self._timer_flush)
    timer.start()

def _timer_flush(self):
    if self._buffer:  # 有資料就寫入
        self._flush_and_clear()
    self._start_timer()  # 重新啟動計時器

核心概念:雙重觸發機制,確保資料既能批量處理,又不會延遲太久。

總結

批量寫入是 Stream Processing 中的核心優化技術,它解決了「一筆一筆寫入」帶來的性能瓶頸:

批量寫入的核心優勢:

  • 減少 DB 壓力:從 1000 次 INSERT 變成 10 次批量 INSERT
  • 提升吞吐量:批量操作比逐筆操作效率高 10-100 倍
  • 降低網路開銷:減少與資料庫的通訊次數

雙重觸發機制的價值:

  • 筆數觸發:確保高流量時的批量效率
  • 時間觸發:確保低流量時的即時性
  • 平衡點:在吞吐量與延遲之間找到最佳平衡

在雙 11、黑五這種訂單洪峰時,這種設計能讓系統從容應對,而不是被一條條寫入拖垮。同時在深夜低峰期,也不會讓用戶等太久才看到資料更新。

業界實例:批量寫入無所不在

這種批量寫入設計並非我們的創新,而是業界的標準做法:

知名 Streaming Engine:

  • Apache Flink:支援批量 Sink,可設定 batch.sizebatch.interval
  • Apache Kafka Streams:內建批量提交機制

Database Client 也都有:

  • JDBCexecuteBatch() 方法
  • PostgreSQLCOPY 指令和 execute_values()
  • MongoDBinsertMany() 批量插入

原因很簡單:這是性能優化的必然選擇,任何高性能系統都會採用類似設計。

下一步思考:當 Consumer 處理速度跟不上 Producer 時會發生什麼?這就帶出了「背壓與流量控制」的重要性。

Day 7 預告:背壓與流量控制

想像你的 SimpleStreamEngine 每秒處理 1000 筆資料,但資料庫寫入已經優化到極限,硬體瓶頸讓它只能每秒寫入 500 筆。多出來的 500 筆去哪了?堆積在記憶體裡,直到系統 OOM 崩潰!

明天我們會探討:

  • 背壓(Backpressure) 如何拯救你的系統

上一篇
【知其然,更知其所以然】Day 5: Speed Layer 效能瓶頸與優化
下一篇
【知其然,更知其所以然】Day 7: Backpressure
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」15
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
bennyxu0624
iT邦新手 5 級 ‧ 2025-08-27 00:22:17

真期待介紹背壓概念!

wudihero2 iT邦新手 5 級 ‧ 2025-08-27 00:26:12 檢舉

/images/emoticon/emoticon13.gif

我要留言

立即登入留言